-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate coreum parallel lib #55
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 15 of 15 files at r1, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @dzmitryhil, @miladz68, and @ysv)
relayer/logger/logger.go
line 37 at r1 (raw file):
Warn(ctx context.Context, msg string, fields ...Field) Error(ctx context.Context, msg string, fields ...Field) ParallelLogger(ctx context.Context) ParallelLogger
Why is it needed? It doesn't look like a legit logger interface now. Accepting different logging interface in every library and then putting adapters everywhere is a nightmare. I don't think we need to create independent logging interfaces everywhere. All the products are controlled by us so we could agree on sth. In the system where I took parallel
from, we agreed that zap
is used everywhere and it's available inside the context. I agree that it might be passed explicitly but then let's agree on some interface at least.
relayer/processes/processor.go
line 50 at r1 (raw file):
} pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(p.log.ParallelLogger(ctx)))
The idea of parallel
is to use parallel.Run
wherever possible. Group was exposed just for some edge cases. Here you may use Run
.
relayer/processes/processor.go
line 63 at r1 (raw file):
func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) error { for {
Here you use parallel
to start single goroutine, but you don't need a gorouitne at all. Whatever you do inside goroutine you may do inside for
loop directly.
relayer/processes/processor.go
line 66 at r1 (raw file):
// spawn one independent task to handle the panics properly err := parallel.Run(ctx, func(ctx context.Context, spawnFn parallel.SpawnFn) error { spawnFn(process.Name, parallel.Continue, func(ctx context.Context) error {
spawnFn(process.Name, parallel.Continue, process.Process.Start)
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
} pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(s.log.ParallelLogger(ctx)))
pg
is not awaited at the end. And here you may use Run
again. That's why it should get a priority because by using Run
you have no option to forget about awaiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 15 of 15 files at r1, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @dzmitryhil, @miladz68, and @wojtek-coreum)
relayer/logger/logger.go
line 37 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
Why is it needed? It doesn't look like a legit logger interface now. Accepting different logging interface in every library and then putting adapters everywhere is a nightmare. I don't think we need to create independent logging interfaces everywhere. All the products are controlled by us so we could agree on sth. In the system where I took
parallel
from, we agreed thatzap
is used everywhere and it's available inside the context. I agree that it might be passed explicitly but then let's agree on some interface at least.
That's interesting thought. I was always trying to make separate interfaces but I agree that it becomes too much at some point.
Maybe We can for example define single interface in coreum-tools and use it everywhere
But I don't have strong opinion on this
relayer/processes/processor.go
line 63 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
Here you use
parallel
to start single goroutine, but you don't need a gorouitne at all. Whatever you do inside goroutine you may do insidefor
loop directly.
do you mean to move prallel.Run outside of for ?
parallel.Run().. {
for {
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 15 files reviewed, 5 unresolved discussions (waiting on @miladz68, @wojtek-coreum, and @ysv)
relayer/logger/logger.go
line 37 at r1 (raw file):
Previously, ysv (Yaroslav Savchuk) wrote…
That's interesting thought. I was always trying to make separate interfaces but I agree that it becomes too much at some point.
Maybe We can for example define single interface in coreum-tools and use it everywhere
But I don't have strong opinion on this
If you still want to go with the structured login, it woun't work. We integrate 3d party lib (for the project) and we should either follow it's rule or don't use the lib or it's features. Here if you don't like that logger, we can't simply don't use it at all for the parallel, so the parallel will use the NoOpLogger
.
relayer/processes/processor.go
line 50 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
The idea of
parallel
is to useparallel.Run
wherever possible. Group was exposed just for some edge cases. Here you may useRun
.
Ok, how can we update the current code to archive that?
relayer/processes/processor.go
line 63 at r1 (raw file):
Previously, ysv (Yaroslav Savchuk) wrote…
do you mean to move prallel.Run outside of for ?
parallel.Run().. { for { } }
I did it to simplify the panic handling, but agree, better to do it differently not to confuse.
relayer/processes/processor.go
line 66 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
spawnFn(process.Name, parallel.Continue, process.Process.Start)
Done.
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
pg
is not awaited at the end. And here you may useRun
again. That's why it should get a priority because by usingRun
you have no option to forget about awaiting.
That's intentional. We start the scanner in the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r2, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @miladz68 and @wojtek-coreum)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r2, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @dzmitryhil, @miladz68, and @ysv)
relayer/logger/logger.go
line 37 at r1 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
If you still want to go with the structured login, it woun't work. We integrate 3d party lib (for the project) and we should either follow it's rule or don't use the lib or it's features. Here if you don't like that logger, we can't simply don't use it at all for the parallel, so the parallel will use the
NoOpLogger
.
You already defined this:
Info(ctx context.Context, msg string, fields ...Field)
Warn(ctx context.Context, msg string, fields ...Field)
Error(ctx context.Context, msg string, fields ...Field)
Parallel could accept just this, or logger implementing Debug(ctx context.Context, msg string, fields ...Field)
.
But having ParallelLogger(ctx context.Context) ParallelLogger
function in logger is horrible.
relayer/processes/processor.go
line 50 at r1 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
Ok, how can we update the current code to archive that?
parallel.Run(ctx, spawnFn parallel.SpawnFn) error {
for i := range processes {
process := processes[i]
spawnFn(process.Name, parallel.Continue, func(ctx context.Context) error {
ctx = tracing.WithTracingProcess(ctx, process.Name)
return p.startProcessWithRestartOnError(ctx, process)
})
}
return nil
}
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
That's intentional. We start the scanner in the background.
But then, you silently ignore all the panics and errors which might happen inside the goroutine. The parallel
has been created exactly for that - to handle errors, not ignore them. It means that:
- goroutines are started
- they are awaited
- error is handled (logged and ignored, or propagated to the upper call depending on usecase).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @miladz68 and @wojtek-coreum)
relayer/logger/logger.go
line 37 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
You already defined this:
Info(ctx context.Context, msg string, fields ...Field) Warn(ctx context.Context, msg string, fields ...Field) Error(ctx context.Context, msg string, fields ...Field)
Parallel could accept just this, or logger implementing
Debug(ctx context.Context, msg string, fields ...Field)
.
But havingParallelLogger(ctx context.Context) ParallelLogger
function in logger is horrible.
Let's discuss on the daily call.
relayer/processes/processor.go
line 50 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
parallel.Run(ctx, spawnFn parallel.SpawnFn) error { for i := range processes { process := processes[i] spawnFn(process.Name, parallel.Continue, func(ctx context.Context) error { ctx = tracing.WithTracingProcess(ctx, process.Name) return p.startProcessWithRestartOnError(ctx, process) }) } return nil }
Got you now, updated.
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
But then, you silently ignore all the panics and errors which might happen inside the goroutine. The
parallel
has been created exactly for that - to handle errors, not ignore them. It means that:
- goroutines are started
- they are awaited
- error is handled (logged and ignored, or propagated to the upper call depending on usecase).
Actually - not, we have logger which handles it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 14 of 15 files reviewed, 3 unresolved discussions (waiting on @miladz68, @wojtek-coreum, and @ysv)
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
Actually - not, we have logger which handles it.
I can pass the spawn func to the scanner from the processor and call it, does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r3, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @miladz68 and @wojtek-coreum)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r3, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @dzmitryhil and @miladz68)
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
I can pass the spawn func to the scanner from the processor and call it, does it make sense?
yep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 13 of 19 files reviewed, 2 unresolved discussions (waiting on @miladz68, @wojtek-coreum, and @ysv)
relayer/logger/logger.go
line 37 at r1 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
Let's discuss on the daily call.
Agreed to implement in separate PR.
relayer/xrpl/scanner.go
line 76 at r1 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
yep
Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r4, all commit messages.
Reviewable status: all files reviewed, 2 unresolved discussions (waiting on @miladz68 and @wojtek-coreum)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 6 of 6 files at r4, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @dzmitryhil and @miladz68)
integration-tests/xrpl/scanner_test.go
line 113 at r4 (raw file):
txsCh := make(chan rippledata.TransactionWithMetaData, txsCount) require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
minor: usually in tests, groups are used instead of parallel.Run
. Because you may cancel them using t.Cleanup()
.
And you may run tested logic in the group, and testing logic directly in the top function - this allows you to use require.*
functions.
integration-tests/xrpl/scanner_test.go
line 177 at r4 (raw file):
// validate that we have all sent hashed and no duplicated hash := tx.GetHash().String() _, found := expectedHashes[hash]
minor: you may do it inside if
relayer/processes/xrpl_tx_observer.go
line 71 at r4 (raw file):
return o.txScanner.ScanTxs(ctx, txCh) }) spawn("tx-processor", parallel.Continue, func(ctx context.Context) error {
parallel.Continue -> parallel.Fail, because this function should exit only if the other goroutine finishes.
relayer/processes/xrpl_tx_observer.go
line 81 at r4 (raw file):
} } return nil
return errors.WithStack(ctx.Err())
relayer/xrpl/scanner.go
line 168 at r4 (raw file):
continue } select {
Correct code is:
select {
case <-ctx.Done():
return lastLedger
case ch <- *tx:
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @miladz68 and @wojtek-coreum)
integration-tests/xrpl/scanner_test.go
line 113 at r4 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
minor: usually in tests, groups are used instead of
parallel.Run
. Because you may cancel them usingt.Cleanup()
.
And you may run tested logic in the group, and testing logic directly in the top function - this allows you to userequire.*
functions.
Actually you won't be able to run the require.*
in the spaw func, which might be needed. Sofor my use-cases the run fits better.
integration-tests/xrpl/scanner_test.go
line 177 at r4 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
minor: you may do it inside
if
Done
relayer/processes/xrpl_tx_observer.go
line 71 at r4 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
parallel.Continue -> parallel.Fail, because this function should exit only if the other goroutine finishes.
Agree, updated
relayer/processes/xrpl_tx_observer.go
line 81 at r4 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
return errors.WithStack(ctx.Err())
Done.
relayer/xrpl/scanner.go
line 168 at r4 (raw file):
Previously, wojtek-coreum (Wojtek) wrote…
Correct code is:
select { case <-ctx.Done(): return lastLedger case ch <- *tx: }
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 3 files at r5, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @dzmitryhil, @miladz68, and @ysv)
integration-tests/xrpl/scanner_test.go
line 113 at r4 (raw file):
Previously, dzmitryhil (Dzmitry Hil) wrote…
Actually you won't be able to run the
require.*
in the spaw func, which might be needed. Sofor my use-cases the run fits better.
that's why I said that you may run tested code in group, and testing code outside the group, where you can use require.*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 12 of 15 files at r1, 1 of 1 files at r3, 3 of 6 files at r4, 3 of 3 files at r5, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @dzmitryhil and @ysv)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @dzmitryhil and @ysv)
# Conflicts: # integration-tests/xrpl/scanner_test.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 3 files at r5, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @dzmitryhil)
5710fa2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 11 of 11 files at r6, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @dzmitryhil)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 11 of 11 files at r6, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @dzmitryhil)
Description
Integrate coreum parallel lib
Reviewers checklist:
Authors checklist
This change is